OPC UA defines a generic PubSub model where the different message mappings can be bound to different underlying transport protocols. This annex provides transport mappings to other Message Oriented Middleware protocols not part of the normative part of PubSub specification.
The mappings in this informative annex are intended to be complete and interoparable but the OPC Foundation does not intend to provide compliance testing for these mappings at this time.
OPC UA defines a generic PubSub model that can be bound to different underlying protocols. This section describes how OPC UA PubSub can be bound to Kafka, a distributed streaming platform that allows publishers and subscribers to exchange messages through topics. Kafka is widely used for data integration, stream processing, and event-driven applications.
The transport protocol binding of OPC UA to Kafka is based on the following assumptions and conventions:
- Kafka brokers are used to manage the topics and partitions, and to store the messages.
- Kafka producers are used to publish OPC UA NetworkMessages to Kafka topics. Topics must be created in the broker before they can be used by producers.
- Kafka consumers are used to subscribe to Kafka topics and receive OPC UA NetworkMessages.
- Kafka topics are configured throgh QueueNames in WriterGroups, DataSetWriters and DataSetReaders.
A Broker may persist messages so they can be delivered even if the subscriber is not online.
Kafka security is applied at the transport level, using the SSL/TLS or SASL mechanisms supported by Kafka. The Kafka security configuration is independent of the OPC UA security configuration and can be used to provide an additional layer of protection.
The OPC UA PubSub transport protocol mapping for Kafka enables the integration of OPC UA PubSub with Kafka-based systems, and the use of Kafka as a reliable and scalable transport for OPC UA PubSub.
The TransportProfileUri for Kafka transport protocol mapping with UADP message mapping is
http://opcfoundation.org/UA-Profile/Transport/pubsub-kafka-uadp
The TransportProfileUri for Kafka transport protocol mapping with JSON message mapping is
http://opcfoundation.org/UA-Profile/Transport/pubsub-kafka-json
The syntax of the Kafka transporting protocol URL used in the Address parameter defined in 6.2.7.3 has the following form:
kafka://<domain name>[:<port>]
The default port is 9092. The protocol prefix above provides transport security using TLS.
Authentication is performed according to the configured AuthenticationProfileUri of the PubSubConnection, DataSetWriterGroup, DataSetWriter or DataSetReader entities.
If no authentication information is provided in the form of ResourceUri and AuthenticationProfileUri, SASL Anonymous is implied.
For simple username/password authentication SASL PLAIN is used.
A JSON body is encoded as defined for the JSON message mapping defined in 7.2.4.6.9.
The optional Kafka header with ContentType key can be set to application/json when sending uncompressed JSON messages.
When sending a gzip (RFC 1952) compressed JSON message on Kafka the optional Kafka header with ContentType key can be set to application/json+gzip.
A UADP body is encoded as defined for the UADP message mapping defined in 7.2.3.
It is expected that the software used to receive UADP NetworkMessage can process the body without needing to know how it was transported.
When sending such message the optional Kafka header with ContentType key can be set to application/opcua+uadp.
The Advanced Message Queuing Protocol (AMQP) is an open standard application layer protocol for Message Oriented Middleware. AMQP is often used with a Broker that relays messages between applications that cannot communicate directly.
Publishers send AMQP messages to AMQP endpoints. Subscribers listen to AMQP endpoints for incoming messages. If a Broker is involved it may persist messages so they can be delivered even if the subscriber is not online. Brokers may also allow messages to be sent to multiple Subscribers.
The AMQP protocol defines a binary encoding for all messages with a header and a body. The header allows applications to insert additional information as name-value pairs that are serialized using the AMQP binary encoding. The body is an opaque binary blob that can contain any data serialized using an encoding chosen by the application.
This document defines two possible message mappings for the AMQP message body: the UADP message mapping defined in 7.2.3 and a JSON message mapping defined in 7.2.4.6.9. AMQP Brokers have an upper limit on message size. The limit is defined by the AMQP field max-message-size. The mechanism for handling NetworkMessages that exceed the Broker limits depends on the MessageMapping. For MessageMappings that support chunking, the NetworkMessage is broken into multiple chunks. The chunk size plus the AMQP header should not exceed the AMQP max-message-size. For MessageMappings that do not support chunking, the NetworkMessages exceeding the maximum size mut be skipped. Diagnostic information for such error scenarios are provided through the Events of the type PubSubTransportLimitsExceedEventType defined in 9.1.13.2 and through the FailedTransmissions counter of the PubSubDiagnosticsWriterGroupType defined in 9.1.11.9.
Security with AMQP is primary provided by a TLS connection between the Publisher or Subscriber and the AMQP Broker, however, this requires that the AMQP Broker be trusted. For that reason, it may be necessary to provide end-to-end security. Applications that require end-to-end security with AMQP need to use the UADP NetworkMessages and binary message encoding defined in 7.2.4.4. JSON encoded message bodies rely on the security mechanisms provided by AMQP and the AMQP Broker.
The syntax of the AMQP transporting protocol URL used in the Address parameter defined in 6.2.7.3 has the following form:
amqps://<domain name>[:<port>][/<path>]
The default port is 5671. The protocol prefix above provides transport security.
amqp://<domain name>[:<port>][/<path>]
The default port is 5672.
The syntax for an AMQP URL over Web Sockets has the following form:
wss://<domain name>[:<port>][/<path>]
The default port is 443.
Authentication is performed according to the configured AuthenticationProfileUri of the PubSubConnection, DataSetWriterGroup, DataSetWriter or DataSetReader entities.
If no authentication information is provided in the form of ResourceUri and AuthenticationProfileUri, SASL Anonymous is implied.
If the authentication profile specifies SASL PLAIN authentication, a separate connection for each new Authentication setting is required.
AMQP allows sending properties as part of opening the connection, session establishment and link attach.
The connection properties apply to any connection, session or link created as part of the PubSubConnection, or subordinate configuration entities, such as WriterGroup and DataSetWriter.
The properties are defined through the KeyValuePair array in the ConnectionProperties WriterGroupProperties and DataSetWriterProperties. The NamespaceIndex of the QualifiedName in the KeyValuePair is 0 for AMQP standard properties. The Name of the QualifiedName is constructed from a prefix and the AMQP property name with the following syntax.
Name = <target prefix>-<AMQP property name>
The target prefix can have the following values:
- Connection;
- session;
- link.
The Value of the KeyValuePair is converted to an AMQP data type using the rules defined in Table B.3. If there is no rule defined for a data type, the property is not included.
The connection properties are intended to be used sparingly to optimize interoperability with existing broker endpoints.
A writer negotiates the delivery guarantees for its link using the snd-settle-mode settlement policy (settled, unsettled, mixed) it will use, and the desired rcv-settle-mode (first, second) of the broker.
Vice versa, the reader negotiates delivery guarantees using its rcv-settle-mode (first, second) and the desired snd-settle-mode (settled, unsettled) of the broker.
This matches to the BrokerTransportQualityOfService values as follows:
- AtMostOnce or BestEffort – messages are pre-settled at the sender endpoint and not sent again. Messages may be lost in transit. This is the default setting.
- AtLeastOnce – messages are received and settled at the receiver without waiting for the sender to settle.
- ExactlyOnce – messages are received, the sender settles and then the receiver settles.
If the KeepAliveTime is set on a WriterGroup, a value slightly higher than the configured value of the group should be used as AMQP idle time-out of the AMQP connection ensuring that the connection is disconnected if the keep alive message was not sent by any writer. Otherwise, if no KeepAliveTime is specified, the implementation should set a reasonable default value.
When setting the maximum message sizes for the Link, the MaxNetworkMessageSize of the PubSubGroup is used. If this value is 0, the implementation chooses a reasonable maximum.
Other limits are up to the implementation and depend on the capabilities of the OS or on the capabilities of the device the Publisher or Subscriber is running on, and can be made configurable through configuration model extensions or by other means.
The AMQP message header has a number of standard fields which are called properties in the AMQP specification. Table B.1 describes how these fields are populated when an AMQP message is constructed.
Table B.1 – AMQP standard header fields
|
Field Name |
Source |
|
message-id |
A globally unique value per message. |
|
Subject |
Valid values are ua-data or ua-metadata. |
|
Content-type |
The MIME type for the message body. The MIME types are specified in the message body subclauses B.3.8.1 and B.3.8.3. |
The subject defines the type of the message contained in the AMQP body. A value of “ua-data” specifies the body contains a UADP or JSON NetworkMessage. A value of “ua-metadata” specifies a body that contains a UA Binary or JSON encoded DataSetMetaData Message. The content-type specifies the whether the message is binary or JSON data.
The AMQP message header includes additional fields defined on the WriterGroup or DataSetWriter through the KeyValuePair array in the WriterGroupProperties and DataSetWriterProperties. The NamespaceIndex of the QualifiedName in the KeyValuePair is 0 for AMQP standard message properties. The Name of the QualifiedName is constructed from a message prefix and the AMQP property name with the following syntax.
Name = message-<AMQP property name>
Table B.2 defines the AMQP standard message properties.
Table B.2 – OPC UA AMQP standard header QualifiedName Name mappings
|
AMQP standard property name |
OPC UA DataType |
AMQP data type |
Note |
|
To |
String |
* |
|
|
user-id |
ByteString |
binary |
|
|
reply-to |
String |
string |
|
|
correlation-id |
ByteString |
* |
|
|
absolute-expiry-time |
Duration |
timestamp |
The absolute-expiry-time is calculated by adding the message-absolute-expiry-time (Duration) from the DataSetWriterProperties to the current time of the DataSetMessage creation. |
|
Group-id |
String |
string |
|
|
reply-to-group-id |
String |
string |
|
|
creation-time |
Boolean |
timestamp |
The creation-time is set to the current time of the DataSetMessage creation if the message-creation-time (Boolean) in the DataSetWriterProperties is True, or else if the value is False or if the property is not configured, the AMQP property is not set. |
|
Content-encoding |
String |
symbol |
|
Any name not in the table is assumed to be an application property. In this case the namespace provided as part of the QualifiedName is the ApplicationUri.
The AMQP message header includes additional promoted fields of the DataSet as a list of name-value pairs. DataSet fields with the PromotedField flag set in the FieldMetaData fieldFlags are copied into the AMQP header. The FieldMetaData Structure is defined in 6.2.3.2.4. Promoted fields are always included in the header even if the DataSetMessage body is a delta frame and the DataSet field is not included in the delta frame. In this case the last known value is sent in the header.
When a field is added to the header it is converted to an AMQP data type using the rules defined in Table B.3. If there is no rule defined for the data type, the field are not included.
Table B.3 – OPC UA AMQP header field conversion rules
|
OPC UA DataType |
Conversion Rules to AMQP data types. |
|
Boolean |
AMQP ‘boolean’ type. |
|
Sbyte |
AMQP ‘byte’ type. |
|
Byte |
AMQP ‘ubyte’ type. |
|
Int16 |
AMQP ‘short’ type. |
|
UInt16 |
AMQP ‘ushort’ type. |
|
Int32 |
AMQP ‘int’ type. |
|
UInt32 |
AMQP ‘uint’ type. |
|
Int64 |
AMQP ‘long’ type. |
|
UInt64 |
AMQP ‘ulong’ type. |
|
Float |
AMQP ‘float’ type. |
|
Double |
AMQP ‘double’ type. |
|
String |
AMQP ‘string’ type. |
|
ByteString |
AMQP ‘binary’ type. |
|
DateTime |
AMQP ‘timestamp’ type. This conversion may result in loss of precision on some platforms. The rules for dealing with the loss of precision are described in OPC 10000-6. |
|
Guid |
AMQP ‘uuid’ type. |
|
QualifiedName |
The QualifiedName is encoded as an AMQP ‘string’ type using the QualifiedName String encoding defined in OPC 10000-6. |
|
LocalizedText |
Not supported and the related field is discarded. |
|
NodeId |
If the NamespaceIndex = 0 the value is encoded as an AMQP ‘string’ type using the format for a NodeId defined in OPC 10000-6. If the NamespaceIndex > 0 the value is converted to an ExpandedNodeId with a NamespaceUri and is encoded as an AMQP ‘string’ type using the format for an ExpandedNodeId defined in OPC 10000-6. |
|
ExpandedNodeId |
If the NamespaceUri is not provided the rules for the NodeId are used. If the NamespaceUri is provided the value is encoded as an AMQP ‘string’ type using the format for an ExpandedNodeId defined in OPC 10000-6. |
|
StatusCode |
AMQP ‘uint’ type. |
|
Variant |
If the value has a supported datatype it uses that conversion; otherwise it is not supported and the related field is discarded. |
|
Structure |
Not supported and the related field is discarded. |
|
Structure with option fields |
Not supported and the related field is discarded. |
|
Array |
Not supported and the related field is discarded. |
|
Union |
Not supported and the related field is discarded. |
The message body is encoded in the AMQP bare-message application-data section as an AMQP ‘binary’ value.
A JSON body is encoded as defined for the JSON message mapping defined in 7.2.4.6.9.
The corresponding MIME type is application/json.
A UADP body is encoded as defined for the UADP message mapping defined in 7.2.3.
The corresponding MIME type is application/opcua+uadp.
If the encoded AMQP message size exceeds the Broker limits it is broken into multiple chunks as described in 7.2.4.4.4.